RDD
RDD全称叫做弹性分布式数据集(Resilient Distributed Datasets),它是一种分布式的内存抽象,表示一个只读的记录分区的集合,它只能通过其他RDD转换而创建,为此,RDD支持丰富的转换操作(如map, join, filter, groupBy等),通过这种转换操作,新的RDD则包含了如何从其他RDDs衍生所必需的信息,所以说RDDs之间是有依赖关系的。基于RDDs之间的依赖,RDDs会形成一个有向无环图DAG,该DAG描述了整个流式计算的流程,实际执行的时候,RDD是通过血缘关系(Lineage)一气呵成的,即使出现数据分区丢失,也可以通过血缘关系重建分区,总结起来,基于RDD的流式计算任务可描述为:从稳定的物理存储(如分布式文件系统)中加载记录,记录被传入由一组确定性操作构成的DAG,然后写回稳定存储。另外RDD还可以将数据集缓存到内存中,使得在多个操作之间可以重用数据集,基于这个特点可以很方便地构建迭代型应用(图计算、机器学习等)或者交互式数据分析应用。可以说Spark最初也就是实现RDD的一个分布式系统,后面通过不断发展壮大成为现在较为完善的大数据生态系统,简单来讲,Spark-RDD的关系类似于Hadoop-MapReduce关系。
RDD 特性
- 由一系列分区(分片)组成
- 对一个 RDD 进行操作,其实就是对一个 RDD 中的所有分区进行操作
- RDD表示只读的分区的数据集,对RDD进行改动,只能通过RDD的转换操作,由一个RDD得到一个新的RDD,新的RDD包含了从其他RDD衍生所必需的信息。RDDs之间存在依赖,RDD的执行是按照血缘关系延时计算的
pyspark
1 | vim ~/.bash_profile |
创建 RDD
将简单的 Python 数据转为 RDD
1 | 1, 2, 3, 4, 5] data = [ |
外部存储数据转换为 RDD
格式只要是 Hadoop 支持的,这里都支持。例如 本地文件系统,HDFS,Cassnadra,HBase,Amazon S3 ,文件文件,二进制文件等。
- 将本地文件转为 RDD
1 | "file:///home/hadoop/data/hello.txt").collect() sc.textFile( |
除了数据文件路径外,我们还可以传递一个目录,或者进行模式匹配,例如 textFile("/my/directory"), textFile("/my/directory/*.txt"), and textFile("/my/directory/*.gz")
textFile 和 parallelize 一样也支持第二个参数来进行分区,默认情况下,Spark为文件的每个块创建一个分区(HDFS中默认为128MB),也可以通过传递更大的值来请求更多的分区
- hdfs
1 | 在命令行中,将刚刚创建的 hello.txt 推送到 hdfs 中去 |
1 | "hdfs://192.168.11.111:8020/hello.txt").collect() sc.textFile( |
开发 pyspark 应用程序
打开 Pychram 设置环境变量
内容为 spark-2.3.0-bin-2.6.0-cdh5.7.0 包中的 python 的目录和 python 的上一级目录
将我们制作的 spark-2.3.0-bin-2.6.0-cdh5.7.0 包中的 python/lib下的 py4j-0.10.6-src.zip 和 pyspark.zip 选中导入。这样就不需要安装 pyspark 和 py4j 包了。
编写测试程序1
2
3
4
5
6
7
8
9
10
11
12
13from pyspark import SparkContext, SparkConf
# 创建 SparkConf, 设置 Spark 相关参数信息
conf = SparkConf().setAppName("spark").setMaster("local[2]")
# 创建 SparkContext
sc = SparkContext(conf=conf)
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)
print(distData.collect())
sc.stop()
在服务器环境中运行1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19mkdir ~/script
vim /home/hadoop/script/spark_test.py
内容如下
from pyspark import SparkContext, SparkConf
conf = SparkConf()
sc = SparkContext(conf=conf)
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)
print(distData.collect())
sc.stop()
提交 pyspark 应用程序
cd /home/hadoop/app/spark-2.3.0-bin-2.6.0-cdh5.7.0/bin
./spark-submit --master local[2] --name spark_test /home/hadoop/script/spark_test.py